package com.atguigu.gmall.activity.receiver;

import com.atguigu.gmall.activity.model.SeckillGoods;
import com.atguigu.gmall.activity.model.UserRecode;
import com.atguigu.gmall.activity.service.SeckillGoodsService;
import com.atguigu.gmall.common.constant.RedisConst;
import com.atguigu.gmall.common.rabbit.config.MqConst;
import com.atguigu.gmall.common.util.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.rabbitmq.client.Channel;
import javafx.print.Collation;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.BoundHashOperations;
import org.springframework.data.redis.core.BoundListOperations;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import java.sql.Time;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * @author: atguigu
 * @create: 2023-06-30 15:41
 */
@Slf4j
@Component
public class ActivityReceiver {

    @Autowired
    private SeckillGoodsService seckillGoodsService;

    @Autowired
    private RedisTemplate redisTemplate;


    /**
     * 监听秒杀商品上架消息,将秒杀商品放入缓存
     *
     * @param channel
     * @param message
     */
    @SneakyThrows
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_TASK, durable = "true"),
            value = @Queue(value = MqConst.QUEUE_TASK_1, durable = "true"),
            key = MqConst.ROUTING_TASK_1
    ))
    public void processSeckillUpper(Channel channel, Message message) {
        //1.查询当日审核通过的参与当杀商品列表 条件:审核通过,库存数量>0
        LambdaQueryWrapper<SeckillGoods> queryWrapper = new LambdaQueryWrapper<>();
        queryWrapper.eq(SeckillGoods::getStatus, "1");
        queryWrapper.gt(SeckillGoods::getStockCount, 0);
        String today = DateUtil.formatDate(new Date());
        queryWrapper.apply("DATE_FORMAT(start_time, '%Y-%m-%d') = '" + today + "'");
        List<SeckillGoods> seckillGoodsList = seckillGoodsService.list(queryWrapper);

        //2.遍历将商品信息放入缓存  TODO 本地缓存存放商品状态位
        String seckillKey = RedisConst.SECKILL_GOODS;
        BoundHashOperations<String, String, SeckillGoods> seckillHashOps = redisTemplate.boundHashOps(seckillKey);
        if (!CollectionUtils.isEmpty(seckillGoodsList)) {
            //2.1 遍历将秒杀商品信息放入hash数据类型中
            seckillGoodsList.stream().forEach(seckillGoods -> {
                String hashKey = seckillGoods.getSkuId().toString();
                if (!seckillHashOps.hasKey(hashKey)) {
                    seckillHashOps.put(hashKey, seckillGoods);
                    //2.2 处理某一件商品将当前商品库存数量放入List数据类型中
                    String stockKey = RedisConst.SECKILL_STOCK_PREFIX + seckillGoods.getSkuId();
                    BoundListOperations<String, String> stockListOps = redisTemplate.boundListOps(stockKey);
                    for (int i = 0; i < seckillGoods.getStockCount(); i++) {
                        stockListOps.leftPush(seckillGoods.getSkuId().toString());
                    }
                    //3.将当前秒杀商品状态位发送到Redis中"seckillpush"话题中
                    redisTemplate.convertAndSend("seckillpush", seckillGoods.getSkuId() + ":1");
                }
            });
        }
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }


    /**
     * 监听秒杀队列中秒杀请求,处理秒杀业务
     *
     * @param userRecode
     * @param channel
     * @param message
     */
    @SneakyThrows
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_SECKILL_USER, durable = "true"),
            value = @Queue(value = MqConst.QUEUE_SECKILL_USER, durable = "true"),
            key = MqConst.ROUTING_SECKILL_USER
    ))
    public void processSeckillReq(UserRecode userRecode, Channel channel, Message message) {
        if (userRecode != null) {
            //1.幂等性处理
            String key = "seckill:" + userRecode.getUserId() + ":" + userRecode.getSkuId() + ":req";
            Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "", 5, TimeUnit.MINUTES);
            if (flag) {
                log.info("[秒杀服务]处理秒杀请求:{}", userRecode);
                //处理秒杀逻辑
                seckillGoodsService.processSeckill(userRecode);
            }
        }
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }


    /**
     * 监听秒杀成功后扣减库存消息
     *
     * @param userRecode 用户秒杀请求对象
     * @param channel
     * @param message
     */
    @SneakyThrows
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_SECKILL_STOCK, durable = "true"),
            value = @Queue(value = MqConst.QUEUE_SECKILL_STOCK, durable = "true"),
            key = MqConst.ROUTING_SECKILL_STOCK
    ))
    public void processDedcutStock(UserRecode userRecode, Channel channel, Message message) {
        if (userRecode != null) {
            //1.幂等性处理 - 避免多次扣减
            String key = "seckill:" + userRecode.getUserId() + ":" + userRecode.getSkuId() + ":deductstock";
            Boolean flag = redisTemplate.opsForValue().setIfAbsent(key, "", 5, TimeUnit.MINUTES);
            if (flag) {
                log.info("[秒杀服务]处理秒杀扣减库存:{}", userRecode);
                //处理扣减库存逻辑
                seckillGoodsService.processDedcutStock(userRecode);
            }
        }
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }


    /**
     * 定时清理秒杀缓存
     * @param channel
     * @param message
     */
    @SneakyThrows
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = MqConst.EXCHANGE_DIRECT_TASK, durable = "true"),
            value = @Queue(value = MqConst.QUEUE_TASK_18, durable = "true"),
            key = MqConst.ROUTING_TASK_18
    ))
    public void processCleanCache(Channel channel, Message message) {
        seckillGoodsService.processCleanCache();
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}
